iT邦幫忙

2023 iThome 鐵人賽

DAY 12
0

前端大部分接觸到的資料格式最常見的就是 json,而 Stream 對前端來說應該是個比較陌生的概念,但每次對後端呼叫 api 取得資料的時候,回來的資料格式就是 Stream,以下就從最常使用的 fetch 來了解什麼是前端的 Stream

我睡覺都會玩 pokemon sleep 蒐集神奇寶貝(我終於抓到伊布一起睡了150小時了),所以讓我從獲得神奇寶貝資訊的 api 開始當做範例,介紹 Stream 在其中所扮演的角色。

以下是利用 pokemon api 取得百變怪的相關資料:

const response = await fetch('https://pokeapi.co/api/v2/pokemon/ditto');
const result = await response.json();
console.log(result);

result 可以拿到百變怪的能力、名稱、屬性、種類等等的資訊,而這些資訊都是以 json 格式來呈現的,這邊的第二行還執行了 await res.json(),將 Response 轉換成 json 格式,但除了 json 格式外,fetch 也支援許多其他的轉換方式 (res.arrayBuffer(), res.blob(), res.formData(), res.text()),藉由這些轉換函式可以讓原先的 Response 轉換成各樣的數據類型,而呼叫 api 拿回來的 Response 本身就是今天要討論的主角 Stream

Stream 的起源

現在網路上有許多的圖片、文件、影音都是可以任由大家下載觀看的,但在久遠的 Javascript 時代,如果你想要下載一個很大的文件或影片,必須要等到瀏覽器將整個檔案下載下來,再經由**反序列化(deserialize)**轉換成需要的格式,而這樣會造成一些麻煩的問題,例如:

  • 要等待整個檔案下載完才能進行操作,如果是在線上要看影片的話,需要等待整部影片都載完才能開始觀看,完全不是好的使用者體驗
  • 如果在網路下載的過程中,網路不穩斷線了,整個檔案都需要重新下載

所以後來才出現了 Stream 的概念,可以將網路下載來的檔案分成一段一段處理,而不是像以往的一整塊綁在一起,而由於整個檔案都切分成很多的一小段,Stream 也可以做到控制什麼時候開始或取消檔案下載,以及網路中斷後支援斷點續傳的功能。

Stream 的傳遞過程

https://ithelp.ithome.com.tw/upload/images/20230926/20162687TSyiWFOTKe.png

Stream 傳遞的過程中,有一個重要的概念叫做 chunk,我們知道 Stream 可能是一個很大的檔案,而分批處理這個檔案的單位就被叫做 chunk (圖中的三角形 data),每個 chunk 會被排入隊列(enqueued),等待被上圖中間的 consumer 操作,像上圖中的 consumer 類型為 Reader(下面講解 ReadableStream 時會再說明),而在 consumer 操作的過程中,整個 Stream 會被鎖住(lock),以防止其他 consumer 操作資料造成不同步的問題,最後 consumer 會將 Stream 轉為不同的資料型態然後往上圖右側繼續傳遞。

Stream 的種類

Stream 根據讀寫的狀況還分成幾種,這裡我們先介紹常見的三種

  • ReadableStream
    可以被讀取的 Stream,上面提到 fecth api 回傳的 Response 就是這一種
  • WritableStream
    用來寫入的 Stream
  • TransformStream
    可以讀取又可以寫入的 Stream,主要作為中間流負責轉換資料

ReadableStream

讀取 Stream

首先執行 getReader 會創建出一個可讀的 reader

const reader = response.body.getReader();

接著會循環的呼叫 reader.read(),依序將 Stream 裡的數據讀取出來,reader.read() 會回傳兩個參數,其中 valueUint8Array 型別,代表這次讀取所得到的 Stream 片段,而 done 則標誌著 Stream 是否全部讀取完畢。

while (true) {
  const {value, done} = await reader.read();
  if (done) break;
  console.log('Received', value);
}

console.log('Response fully received');

讀取 Stream 為 ArrayBuffer

結合以上的方法,可以實作一個簡單的 toBuffer 函式將百變怪的資訊以 Uint8Array 的結果讀取出來:

const res = await fetch('https://pokeapi.co/api/v2/pokemon/ditto');
const reader = body.getReader();
const buffer = await toBuffer(reader);
console.log('Received buffer data', buffer); // [Uint8Array(23286)]

async function toBuffer(reader) {
  const chunk = [];
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;

    chunk.push(value);
  }

  return chunk;
}

讀取 Stream 為 JSON

但在這個獲取寶可夢的例子上,我們需要的資料格式不是 ArrayBuffer 而是 JSON,要將 Stream 轉換為文本格式的話,我們需要用到的是另一個 web api - TextDecoder (補充小知識 1.),利用 TextDecoder 可以很輕鬆的將 Stream 轉換成想要的文本格式

const res = await fetch('https://pokeapi.co/api/v2/pokemon/ditto');
const reader = body.getReader();
const json = await toJSON(reader);
console.log('Received json data', json);

async function toJSON(reader) {
  const decoder = new TextDecoder();
  const chunks = [];

  while (true) {
    const { done, value } = await reader.read();

    if (done) {
      break;
    }
    
    // 將原本 Stream 型態的 value 一段一段的解碼為文本格式 chunk
    const chunk = decoder.decode(value, { stream: true });
    chunks.push(chunk);
  }

  return JSON.parse(chunks.join(''));
}

WritableStream

建構 WritableStream

WritableStream 可以實例化建立 (事實上 ReadableStream 也有一樣的用法,但有點太過複雜以上沒有提到),而在執行 new WritableStream() 時丟入的第一個參數可以定義 Stream 在不同生命週期執行的函式:

  • start(controller)
    WritableStream 被建立時只會執行一次,主要可以拿來處理初始化需要的邏輯
  • write(chunk,controller)
    當有新的 chunk 進來時就會被呼叫,所以在寫入的過程中,這個函式會一直被多次執行
  • close(controller)
    當沒有新的 chunk 進來時,代表整個寫入階段結束時執行
  • abort(reason)
    當在寫入途中發生任何問題時執行,類似 error handling 機制
const stream = new WritableStream(
  {
    start(controller) {},
    write(chunk, controller) {},
    close(controller) {},
    abort(reason) {},
  },
);

以下利用 MDN 中的 Simple writer example 範例,稍微了解一下 WritableStream 的使用方式。
這個例子展示的是將 "Hello, world" 這段文字先編碼成 ArrayBuffer 丟進 WritableStream 中寫入,接著再從 write(chunk, controller) 中將各字母解碼後並顯示在畫面上。

https://ithelp.ithome.com.tw/upload/images/20230926/20162687sscuvTvnFD.png

第一步先實例化 WritableStream 並在每次接受到 chunk 寫入時,將原本的 ArrayBuffer 解碼後獲取每個單字字母並渲染到畫面

const writableStream = new WritableStream(
  {
    write(chunk) {
      return new Promise((resolve, reject) => {
        const buffer = new ArrayBuffer(1);
        const view = new Uint8Array(buffer);
        view[0] = chunk;
        
        // 將 ArrayBuffer 解碼回英文字母
        const decoder = new TextDecoder();
        const decoded = decoder.decode(view, { stream: true });
        
        // 準備將字母渲染回畫面上
        const listItem = document.createElement("li");
        listItem.textContent = `Chunk decoded: ${decoded}`;
        list.appendChild(listItem);
        result += decoded;
        resolve();
      });
    },
  },
);

然後執行 sendMessage 函式,把原本的整個句子編碼成 ArrayBuffer 後寫入到 WritableStream

sendMessage("Hello, world.", writableStream);
function sendMessage(message, writableStream) {
  const defaultWriter = writableStream.getWriter();
  
  // 把原本的整個句子編碼成 ArrayBuffer
  const encoder = new TextEncoder();
  const encoded = encoder.encode(message, { stream: true });
  
  // 將編碼後的 ArrayBuffer 寫入 WritableStream
  encoded.forEach((chunk) => {
    defaultWriter.ready
      .then(() => defaultWriter.write(chunk))
      .then(() => console.log("Chunk written to sink."))
      .catch((err) => console.error("Chunk error:", err));
  });
  
  // 再度呼叫 .ready 以保證在關閉 Stream 前,所有的 chunks 都被寫入
  defaultWriter.ready
    .then(() => defaultWriter.close())
    .then(() => console.log("All chunks written"))
    .catch((err) => console.error("Stream error:", err));
}

這裡需要特別注意的是,在寫入 Stream 前都需要執行 ready,以保證目前的 WritableStream 的狀態是可以接受下一個 chunk 傳入進來的

defaultWriter.ready.then(() => ...)

以上範例擷取了大概的流程以讓大家了解 WritableStream 的使用方式,但其實在建構 new WritableStream 時還可以傳入第二個參數,第二個參數中有 highWaterMark, size 等其他設定值,這部分牽涉到的是 Backpressure 的觀念,但因為較為深入就不在我們今天的討論範圍

const stream = new WritableStream(
  {
    start(controller) {},
    write(chunk, controller) {},
    close(controller) {},
    abort(reason) {},
  },
  {
    highWaterMark: 3,
    size: () => 1,
  },
);

以上關於 WritableStream 範例,完整的程式碼請參照 How writable streams work

TransformStream

TransformStream 可以讀取也可以寫入,類似中介者的腳色常用來進行不同的資料轉換,最常搭配 pipeThrough()pipeTo() 一起使用,丟入 pipeThrough() 中的參數就是 TransformStream,可以將資料轉換過後再丟到下一個步驟進行處理,類似 redux, expressmiddleware,而最後要將處理過的資料寫入則是會利用 pipeTo() 函式。

下面讓我們看看兩個例子比較能理解 TransformStream 的用法:

https://ithelp.ithome.com.tw/upload/images/20230926/2016268710TIP5lyWB.png

還記得上面講到 ReadableStream 時寫了一個 toJSON 的函式將 Stream 轉為 JSON 嗎?這一段邏輯可以改用 web 內置的 TextDecoderStream 讓程式碼更容易被覆用,這裡使用 pipeThrough 加上 TextDecoderStream,將原本寫的 解碼(TextDecoder) 邏輯改為讓 TextDecoderStream 這個 TransformStream 去做處理,讓整個架構的職責區分更為清楚

const response = await fetch(url);
// 讓 TextDecoderStream 處理解碼(TextDecoder)
const reader = response.body.pipeThrough(new TextDecoderStream()).getReader();
const rawJson = await toBuffer(reader);
const json = JSON.parse(rawJson.join(''));

async function toBuffer(reader) {
  const chunk = [];
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;

    chunk.push(value);
  }

  return chunk;
}

第二個例子是建立一個 TransformStream 實例,使用他能讀又能寫的功用,將資料流壓縮成 gzip,再回寫到 TransformStreamwritable 裡,這樣只要使用一遍 TransformStream 就將原本的 Stream 轉換成我們想要的格式

// Get from url1:
const response = await fetch(url1);
const { readable, writable } = new TransformStream();

// Compress the data from url1:
response.body.pipeThrough(new CompressionStream('gzip')).pipeTo(writable);

// Post to url2:
await fetch(url2, {
  method: 'POST',
  // 這裡的 readable 已經是被壓縮成 gzip 格式過後的了
  body: readable,
});

在 web worker 中使用 Stream

回到一開始這篇介紹 Stream 的原因,Stream 是一種 Transferable objects,這代表在處理影音或是串流這種檔案較大的媒體格式時,可以考慮把 Stream 轉移(transfer)worker 中做一些耗時的處理,最後再將想要的結果傳回主線程,讓處理 Stream 的過程不會影響到主線程的運行。

結論

以上大概就是 ReadableStream, WritableStream, TransformStream 的一些基本介紹,中間略過了很多複雜的概念,像是 stream lock, backpressure 等等,但我想以上的範例足夠幫助大家快速理解三種 Stream 的基本用法了。

補充小知識

  1. TextDecoder
    用來處理文字相關的編碼(TextEncoder)解碼(TextDecoder),包括常見的 utf-8utf-16big-5 等等,constructer 中可以傳入編碼格式,不傳的話預設為 utf-8

以下例子分別使用 encodedecode 兩個方法進行 utf-8 的編、解碼轉換。

// 編碼
const encoder = new TextEncoder();
const array = encoder.encode("€"); // Uint8Array(3) [226, 130, 172]

// 解碼
const decoder = new TextDecoder();
const str = decoder.decode(array); // String "€"
  1. body stream already read
    https://ithelp.ithome.com.tw/upload/images/20230926/20162687wRhrNMm8LU.png

有時候我們想要 log 出 api 的回傳結果時,可能會看到這個錯誤,原因是呼叫了兩次 res.json(),我們現在知道 res.body 實際上是 Stream,而 Stream 不能被操作兩次,所以才會出現這個錯誤訊息。

Reference

Streams API concepts
Streaming requests with the fetch API
从 Fetch 到 Streams —— 以流的角度处理网络请求
How to Convert JavaScript ReadableStream Object to JSON?
Using writable streams


上一篇
BroadcastChannel
下一篇
Transferable objects - ImageBitmap
系列文
網頁的另一個大腦:從基礎到進階掌握 Web Worker 技術30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言